package com.hzya.frame;

import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.logging.ComponentLog;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.*;
import java.util.stream.Collectors;

/**
 * @Author：liuyang
 * @Package：com.hzya.frame
 * @Project：nifi-hzyadev-bundle
 * @name：DevAutoJsonTableCreateProcessor
 * @Date：2025/7/10 17:24
 * @Filename：DevAutoJsonTableCreateProcessor
 */
@Tags({"sql", "ddl", "json", "database", "schema", "ensure", "create", "mysql", "oracle", "sqlserver"})
@CapabilityDescription("连接到指定的数据库，并确保目标表存在。如果表不存在，则根据FlowFile的JSON内容生成并执行CREATE TABLE语句。" + "如果表已存在，则不执行任何操作。成功或跳过时，将原始FlowFile路由到success关系。")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@ReadsAttributes({@ReadsAttribute(attribute = "targetInsertTableName", description = "要检查或创建的目标表的名称。"), @ReadsAttribute(attribute = "jsonPrimaryKeyTag", description = "一个JSON数组字符串，指定主键字段，例如 '[\"id\", \"user_name\"]'。")})
@WritesAttributes({@WritesAttribute(attribute = "ddl.sql.executed", description = "如果执行了CREATE TABLE语句，该语句将被写入此属性用于审计。"), @WritesAttribute(attribute = "ddl.database.type", description = "检测到的数据库类型 (mysql, sqlserver, or oracle)。")})
public class DevAutoJsonTableCreateProcessor extends AbstractProcessor {
    // --- 属性定义 (Properties) ---
    public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder().name("Database Connection Pooling Service").displayName("数据库连接池服务").description("用于连接数据库、检查和创建表的DBCPConnectionPool控制器服务。").identifiesControllerService(DBCPService.class).required(true).build();

    // --- 关系定义 (Relationships) ---
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("当表已存在或成功创建时，原始FlowFile将被路由到此关系。").build();

    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("在处理过程中发生任何错误（如数据库连接失败、SQL执行失败）时，FlowFile将被路由到此关系。").build();

    private Set<Relationship> relationships;
    private List<PropertyDescriptor> descriptors;
    // ObjectMapper 仍然用于解析FlowFile的JSON内容
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    protected void init(final ProcessorInitializationContext context) {
        final List<PropertyDescriptor> descriptors = new ArrayList<>();
        descriptors.add(DBCP_SERVICE);
        this.descriptors = Collections.unmodifiableList(descriptors);

        final Set<Relationship> relationships = new HashSet<>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_FAILURE);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }

        final ComponentLog logger = getLogger();

        // --- 1. 获取并验证 FlowFile 属性 ---
        final String tableName = flowFile.getAttribute("targetInsertTableName");
        if (tableName == null || tableName.trim().isEmpty()) {
            logger.error("属性 'targetInsertTableName' 缺失或为空，FlowFile {} 将被路由到 failure。", flowFile);
            session.transfer(session.penalize(flowFile), REL_FAILURE);
            return;
        }

        final String autoTableCreation = flowFile.getAttribute("autoTableCreation");
        if (autoTableCreation == null || autoTableCreation.trim().isEmpty()) {
            logger.error("属性 'autoTableCreation' 缺失或为空，FlowFile {} 将被路由到 failure。", flowFile);
            session.transfer(session.penalize(flowFile), REL_FAILURE);
            return;
        }
        if ("N".equals(autoTableCreation)) {
            logger.info("autoTableCreation=N（取消自动建表）。跳过创建步骤，直接将 FlowFile {} 路由到 success。", tableName, flowFile);
            session.transfer(flowFile, REL_SUCCESS);
            return;
        }

        final String jsonPrimaryKeyTag = flowFile.getAttribute("jsonPrimaryKeyTag");
        final List<String> primaryKeys;
        try {
            // *** 代码改造点: 使用自定义的解析方法，不再依赖外部库解析此属性 ***
            primaryKeys = parseJsonArrayString(jsonPrimaryKeyTag);
            if (primaryKeys.isEmpty()) {
                logger.warn("属性 'jsonPrimaryKeyTag' 为空或解析后无内容，将创建没有主键的表。FlowFile: {}", flowFile);
            }
        } catch (Exception e) {
            logger.error("解析 'jsonPrimaryKeyTag' 失败: '{}'. 错误: {}. FlowFile {} 将被路由到 failure。", jsonPrimaryKeyTag, e.getMessage(), flowFile);
            session.transfer(session.penalize(flowFile), REL_FAILURE);
            return;
        }

        final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);

        // --- 2. 连接数据库、检查并执行 ---
        try (final Connection conn = dbcpService.getConnection(flowFile.getAttributes())) {
            final String dbType = getDbType(conn);

            if (tableExists(conn, tableName, dbType)) {
                logger.info("表 '{}' 已存在。跳过创建步骤，直接将 FlowFile {} 路由到 success。", tableName, flowFile);
                session.transfer(flowFile, REL_SUCCESS);
                return;
            }

            logger.info("表 '{}' 不存在。准备根据 FlowFile {} 的内容生成并执行 DDL。", tableName, flowFile);

            final Holder<String> sqlHolder = new Holder<>();
            session.read(flowFile, in -> {
                JsonNode rootNode = objectMapper.readTree(in);
                JsonNode targetNode = rootNode.isArray() && rootNode.size() > 0 ? rootNode.get(0) : rootNode;

                if (!targetNode.isObject()) throw new IOException("JSON 内容不是一个有效的对象结构。");

                Map<String, String> columnDefinitions = new LinkedHashMap<>();
                List<String> foundPrimaryKeys = new ArrayList<>();
                Iterator<Map.Entry<String, JsonNode>> fields = targetNode.fields();
                while (fields.hasNext()) {
                    Map.Entry<String, JsonNode> field = fields.next();
                    String fieldName = field.getKey().replaceAll("\\s+", "");
                    JsonNode valueNode = field.getValue();
                    int length = (valueNode == null || valueNode.isNull() || valueNode.asText().isEmpty()) ? 10 : Math.max(10, valueNode.asText().length() * 10);
                    if ("oracle".equals(dbType)) length = Math.min(length, 4000);

                    columnDefinitions.put(fieldName, getColumnTypeForDB(dbType, length));
                    // 检查JSON字段名是否在主键列表中
                    if (primaryKeys.contains(fieldName.toLowerCase())) {
                        foundPrimaryKeys.add(fieldName);
                    }
                }

                if (columnDefinitions.isEmpty()) throw new IOException("从JSON中未能解析出任何字段。");

                sqlHolder.set(generateCreateTableSql(dbType, tableName, columnDefinitions, foundPrimaryKeys));
            });

            String generatedSql = sqlHolder.get();
            if (generatedSql == null || generatedSql.isEmpty()) {
                logger.error("生成的 DDL 为空，无法执行。FlowFile {} 将被路由到 failure。", flowFile);
                session.transfer(session.penalize(flowFile), REL_FAILURE);
                return;
            }

            try (final Statement statement = conn.createStatement()) {
                statement.execute(generatedSql);
                logger.info("成功执行 DDL 创建了表 '{}'。SQL: {}", tableName, generatedSql);
                flowFile = session.putAttribute(flowFile, "ddl.sql.executed", generatedSql);
                flowFile = session.putAttribute(flowFile, "ddl.database.type", dbType);
                session.transfer(flowFile, REL_SUCCESS);
            } catch (final SQLException e) {
                logger.error("执行 DDL 失败。SQL: [{}]. FlowFile {} 将被路由到 failure。", generatedSql, flowFile, e);
                session.transfer(session.penalize(flowFile), REL_FAILURE);
            }

        } catch (Exception e) {
            logger.error("在确保表 '{}' 存在时发生未知错误。FlowFile {} 将被路由到 failure。", tableName, flowFile, e);
            session.transfer(session.penalize(flowFile), REL_FAILURE);
        }
    }

    /**
     * *** 新增方法 ***
     * 手动解析类JSON数组格式的字符串，不依赖任何外部库。
     * 例如，将 '["key1", "key2"]' 解析为 List<String>。
     *
     * @param jsonArrayStr 包含主键的字符串
     * @return 解析后的主键列表
     */
    private List<String> parseJsonArrayString(String jsonArrayStr) {
        if (jsonArrayStr == null || jsonArrayStr.trim().isEmpty()) {
            return Collections.emptyList();
        }

        String trimmed = jsonArrayStr.trim();
        if (!trimmed.startsWith("[") || !trimmed.endsWith("]")) {
            throw new IllegalArgumentException("无效的格式：字符串必须以'['开头并以']'结尾。");
        }

        // 处理空数组 "[]" 的情况
        if (trimmed.length() <= 2) {
            return Collections.emptyList();
        }

        // 移除首尾的方括号
        String content = trimmed.substring(1, trimmed.length() - 1).trim();
        if (content.isEmpty()) {
            return Collections.emptyList();
        }

        List<String> keys = new ArrayList<>();
        String[] parts = content.split(",");

        for (String part : parts) {
            String cleanedPart = part.trim();
            // 移除首尾可能存在的引号
            if (cleanedPart.startsWith("\"") && cleanedPart.endsWith("\"")) {
                if (cleanedPart.length() > 1) {
                    cleanedPart = cleanedPart.substring(1, cleanedPart.length() - 1);
                } else {
                    cleanedPart = ""; // 处理只有引号的情况 ""
                }
            }
            if (!cleanedPart.isEmpty()) {
                //转换为大小，用于后续对比
                keys.add(cleanedPart.toLowerCase(Locale.ROOT));
            }
        }
        return keys;
    }


    private String getDbType(final Connection conn) throws SQLException {
        String dbProductName = conn.getMetaData().getDatabaseProductName().toLowerCase();
        if (dbProductName.contains("mysql")) return "mysql";
        if (dbProductName.contains("microsoft sql server")) return "sqlserver";
        if (dbProductName.contains("oracle")) return "oracle";
        throw new ProcessException("不支持的数据库类型: " + dbProductName);
    }

    private boolean tableExists(final Connection conn, final String tableName, final String dbType) throws SQLException {
        final DatabaseMetaData metaData = conn.getMetaData();
        String namePattern = "oracle".equals(dbType) ? tableName.toUpperCase() : tableName;
        try (ResultSet rs = metaData.getTables(conn.getCatalog(), conn.getSchema(), namePattern, new String[]{"TABLE"})) {
            return rs.next();
        }
    }

    private String getColumnTypeForDB(String dbType, int length) {
        switch (dbType) {
            case "mysql":
                return "VARCHAR(" + length + ")";
            case "sqlserver":
                return "VARCHAR(" + length + ")";
            case "oracle":
                return "VARCHAR2(" + length + " BYTE)";
            default:
                return "VARCHAR(" + length + ")";
        }
    }

    private String generateCreateTableSql(String dbType, String tableName, Map<String, String> columnDefinitions, List<String> primaryKeys) {
        String quotedTableName;
        String quoteChar;
        String pkQuoteChar;
        String identifierCase = "none";

        switch (dbType) {
            case "mysql":
                quotedTableName = getQuotedIdentifier(tableName, "`");
                quoteChar = "`";
                pkQuoteChar = "`";
                break;
            case "sqlserver":
                quotedTableName = getQuotedIdentifier(tableName, "[");
                quoteChar = "[";
                pkQuoteChar = "[";
                break;
            case "oracle":
                quotedTableName = getQuotedIdentifier(tableName.toUpperCase(), "\"");
                quoteChar = "\"";
                pkQuoteChar = "\"";
                identifierCase = "upper";
                break;
            default:
                throw new ProcessException("不支持的数据库类型: " + dbType);
        }

        final String finalIdentifierCase = identifierCase;
        String columnsSql = columnDefinitions.entrySet().stream().map(entry -> {
            String fieldName = "upper".equals(finalIdentifierCase) ? entry.getKey().toUpperCase() : entry.getKey();
            // 注意：这里比较时需要统一大小写，因为primaryKeys列表是原始大小写
            String originalFieldName = entry.getKey();
            String notNull = (primaryKeys != null && primaryKeys.contains(originalFieldName)) ? " NOT NULL" : "";
            return "    " + getQuotedIdentifier(fieldName, quoteChar) + " " + entry.getValue() + notNull;
        }).collect(Collectors.joining(",\n"));

        StringBuilder sql = new StringBuilder("CREATE TABLE ").append(quotedTableName).append(" (\n").append(columnsSql);

        if (primaryKeys != null && !primaryKeys.isEmpty()) {
            sql.append(",\n");
            String pkColumns = primaryKeys.stream().map(pk -> "upper".equals(finalIdentifierCase) ? pk.toUpperCase() : pk).map(pk -> getQuotedIdentifier(pk, pkQuoteChar)).collect(Collectors.joining(", "));

            String constraintName = "PK_" + tableName.replaceAll("[^a-zA-Z0-9_]", "_");
            if (constraintName.length() > 30) {
                constraintName = constraintName.substring(0, 30);
            }
            if ("oracle".equals(dbType)) {
                constraintName = constraintName.toUpperCase();
            }
            sql.append("    CONSTRAINT ").append(getQuotedIdentifier(constraintName, pkQuoteChar)).append(" PRIMARY KEY (").append(pkColumns).append(")");
        }

        sql.append("\n)");
        return sql.toString();
    }

    private String getQuotedIdentifier(String identifier, String quoteChar) {
        String endQuoteChar = quoteChar;
        if ("[".equals(quoteChar)) endQuoteChar = "]";
        return quoteChar + identifier.replace(endQuoteChar, endQuoteChar + endQuoteChar) + endQuoteChar;
    }

    private static class Holder<T> {
        private T value;

        void set(T value) {
            this.value = value;
        }

        T get() {
            return value;
        }
    }
}